package org.topteam.oschat.akka;

import java.util.HashMap;
import java.util.Map;

import org.topteam.oschat.entity.User;
import org.topteam.oschat.event.SessionEvent;
import org.topteam.oschat.event.SessionEvent.SessionClose;
import org.topteam.oschat.event.SessionEvent.SessionOpen;
import org.topteam.oschat.message.TextMessage;
import org.topteam.oschat.service.IUserService;
import org.topteam.oschat.util.SpringUtils;

import akka.actor.ActorRef;
import akka.actor.PoisonPill;
import akka.actor.Props;
import akka.actor.UntypedActor;
import akka.contrib.pattern.ClusterSingletonManager;
import akka.contrib.pattern.ClusterSingletonProxy;

/**
 * 用户Actor管理类
 * 
 * @author JiangFeng
 * 
 */
public class SessionActor extends UntypedActor {

	public static final String ACTOR_ID = "session";

	private Map<String, ActorRef> users = new HashMap<>();

	private IUserService userService;

	public SessionActor() {
		userService = SpringUtils.getBean(IUserService.class);
	}

	@Override
	public void onReceive(Object message) throws Exception {
		// 有新的ws连接，如果userActor不存在，则创建
		if (message instanceof SessionEvent.SessionOpen) {
			SessionOpen open = (SessionOpen) message;
			User user = open.getUser();
			ActorRef userAc = users.get(user.getUserId());
			if (userAc == null) {

				getContext().system().actorOf(
						ClusterSingletonManager.defaultProps(
								Props.create(UserActor.class, user),
								user.getUserId(), PoisonPill.getInstance(),
								"oschat"), user.getUserId());
				userAc = getRemoteActor(user.getUserId());
				userAc.tell(open, userAc);
				users.put(user.getUserId(), userAc);
				getContext().watch(userAc);

			} else {
				userAc.tell(open, userAc);
			}
			// 当有连接关闭时，通知对应的UserActor
		} else if (message instanceof SessionEvent.SessionClose) {
			SessionClose close = (SessionClose) message;
			getRemoteActor(close.getUserId()).tell(close, getSelf());
			// 转发消息
		} else if (message instanceof TextMessage) {
			TextMessage msg = (TextMessage) message;
			getRemoteActor(msg.getTo()).tell(msg, getSelf());
		}
	}

	/**
	 * 根据akka path 从集群中查询Actor
	 * 
	 * @param id
	 * @return
	 */
	private ActorRef getRemoteActor(String id) {
		// int totalInstances = 100;
		// Iterable<String> routeesPaths = Arrays.asList("/user/" + id);
		// boolean allowLocalRoutees = true;
		// ClusterRouterGroup clusterRouterGroup = new ClusterRouterGroup(
		// new AdaptiveLoadBalancingGroup(
		// HeapMetricsSelector.getInstance(),
		// Collections.<String> emptyList()),
		// new ClusterRouterGroupSettings(totalInstances, routeesPaths,
		// allowLocalRoutees, "oschat"));
		// ActorRef remoteActor = getContext().system().actorOf(
		// clusterRouterGroup.props());
		ActorRef remoteActor = getContext().actorOf(
				ClusterSingletonProxy.defaultProps("user/" + id + "/" + id,
						"oschat"));
		return remoteActor;
	}
}
